package realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import realtime.app.func.BeanToJsonStrFunction;
import realtime.beans.CartAddUuBean;
import realtime.util.DateFormatUtil;
import realtime.util.DorisUtil;
import realtime.util.EnvUtil;
import realtime.util.KafkaUtil;

public class DwsTradeCartAddUuWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = EnvUtil.getSEE(4);
        String topic = "dwd_trade_cart_add";
        String groupId = "dwd_trade_cart_add_group";
        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaConsumer(topic, groupId);
        DataStreamSource<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");
        //把流中数据转换为JSONObj
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaStrDS.map(JSON::parseObject);
        //指定水位线
        SingleOutputStreamOperator<JSONObject> watermarkDS = jsonObjDS.assignTimestampsAndWatermarks(
                WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject jsonObject, long l) {
                                return jsonObject.getLong("ts") * 1000;
                            }
                        })
        );
        //按照用户id分组
        KeyedStream<JSONObject, String> keyedDS = watermarkDS.keyBy(v1 -> v1.getString("user_id"));
        //对流中数据进行处理并转换为实体类
        SingleOutputStreamOperator<JSONObject> processDS = keyedDS.process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {
            private ValueState<String> lastCartDateState;

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("lastCartDateState", String.class);
                valueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1)).build());
                lastCartDateState = getRuntimeContext().getState(valueStateDescriptor);
            }

            @Override
            public void processElement(JSONObject jsonObj, KeyedProcessFunction<String, JSONObject, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
                String lastCartDate = lastCartDateState.value();
                Long ts = jsonObj.getLong("ts") * 1000;
                String curDate = DateFormatUtil.toDate(ts);
                if (StringUtils.isEmpty(lastCartDate) || !lastCartDate.equals(curDate)) {
                    out.collect(jsonObj);
                    lastCartDateState.update(curDate);
                }
            }
        });
        //开窗
        AllWindowedStream<JSONObject, TimeWindow> windowDS = keyedDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));
        //聚合
        SingleOutputStreamOperator<CartAddUuBean> aggregateDS = windowDS.aggregate(
                new AggregateFunction<JSONObject, Long, Long>() {
                    @Override
                    public Long createAccumulator() {
                        return 0L;
                    }

                    @Override
                    public Long add(JSONObject jsonObject, Long aLong) {
                        return ++aLong;
                    }

                    @Override
                    public Long getResult(Long aLong) {
                        return aLong;
                    }

                    @Override
                    public Long merge(Long aLong, Long acc1) {
                        return null;
                    }
                },
                new AllWindowFunction<Long, CartAddUuBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<Long> values, Collector<CartAddUuBean> out) throws Exception {
                        String stt = DateFormatUtil.toYmdHms(window.getStart());
                        String edt = DateFormatUtil.toYmdHms(window.getEnd());

                        //没有分区,手动添加分区或者修改分区时间
                        long timeMillis = System.currentTimeMillis();
                        String curDate = DateFormatUtil.toDate(timeMillis);

                        for (Long value : values) {
                            out.collect(
                                    new CartAddUuBean(
                                            stt,
                                            edt,
                                            curDate,
                                            value
                                    )
                            );
                        }
                    }
                }
        );
        //aggregateDS.print(">>>>");
        aggregateDS
                .map(new BeanToJsonStrFunction<>())
                .sinkTo(DorisUtil.getDorisSink("dws_trade_cart_add_uu_window"));

        env.execute();
    }
}
